Skip to content

push down min/max/count to iceberg#6252

Closed
huaxingao wants to merge 5 commits intoapache:masterfrom
huaxingao:agg_push_down
Closed

push down min/max/count to iceberg#6252
huaxingao wants to merge 5 commits intoapache:masterfrom
huaxingao:agg_push_down

Conversation

@huaxingao
Copy link
Copy Markdown
Contributor

@huaxingao huaxingao commented Nov 23, 2022

This PR pushes down min/max/count to iceberg.

For SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table, without this PR, iceberg will do SELECT col FROM table, and Spark will calculate MIN(col), MAX(col), COUNT(col), COUNT(*). With this PR, iceberg will do SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table. MIN, MAX, COUNT will be calculated on iceberg side using the statistics info in the manifest file.

I have the following changes:

  1. Add a SparkSQL property AGGREGATE_PUSH_DOWN_ENABLED. The default is true.
  2. Make SparkScanBuilder implement SupportsPushDownAggregates, so MIN/MAX/COUNT can be pushed down to iceberg, and then iceberg will read the statistics information (upper_bound, lower_bound, record_count) from manifest file, calculate the MIN/MAX/COUNT, build a Spark InternalRow and pass the InternalRow to Spark.
  3. push down logic is decided in SparkScanBuilder.pushAggregation. If any of the aggregates can't be pushed down, e.g. upper_bound, lower_bound, record_count are not available, we will fall back.
  4. Add a SparkLocalScan. It is a special Scan which will happen on Spark driver locally instead of executors. If MIN/MAX/COUNT are pushed down, iceberg will create a SparkLocalScan, and then iceberg doesn't need to plan files, create FileScanTasks, and send the tasks to executors. Instead, iceberg can just do a local scan on Spark driver. Max, min or count will be calculated from statistics info, and an InternalRow will be built and returned to Spark.

In the tests, explain result of the physical plan is checked to see if MIN/MAX/COUNT are pushed down.

For example, SELECT max(data), min(data) FROM table
If MIN/MAX/COUNT are not pushed down

== Optimized Logical Plan ==
Aggregate [max(data#146) AS max(data)#150, min(data#146) AS min(data)#151, count(data#146) AS count(data)#152L]
+- RelationV2[data#146] spark_catalog.default.table

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max(data#146), min(data#146), count(data#146)], output=[max(data)#150, min(data)#151, count(data)#152L])
   +- SortAggregate(key=[], functions=[partial_max(data#146), partial_min(data#146), partial_count(data#146)], output=[max#165, min#166, count#167L])
      +- BatchScan[data#146] spark_catalog.default.table [filters=] RuntimeFilters: []

If MIN/MAX/COUNT are pushed down

== Optimized Logical Plan ==
Aggregate [max(MAX(data)#440) AS max(data)#429, min(MIN(data)#441) AS min(data)#430, sum(COUNT(data)#442L) AS count(data)#431L]
+- RelationV2[MAX(data)#440, MIN(data)#441, COUNT(data)#442L] spark_catalog.default.table

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max(MAX(data)#440), min(MIN(data)#441), sum(COUNT(data)#442L)], output=[max(data)#429, min(data)#430, count(data)#431L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#297]
      +- SortAggregate(key=[], functions=[partial_max(MAX(data)#440), partial_min(MIN(data)#441), partial_sum(COUNT(data)#442L)], output=[max#446, min#447, sum#448L])
         +- LocalTableScan [MAX(data)#440, MIN(data)#441, COUNT(data)#442L]

The physical plan is checked to see if it contains MAX(data)/MIN(data)/COUNT(data).

@huaxingao
Copy link
Copy Markdown
Contributor Author

The original PR is here

SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.DATA_FILES);
Dataset dataset =
metadataRows.selectExpr(
"lower_bounds", "upper_bounds", "record_count", "null_value_counts");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about using something like

metadataRows.select(functions.max(functions.col("upper_bounds.1")))

but the values in the statistics such as upper_bounds are in byte array. I don't think I can get the correct max/min values.
For example, if I have 3.333 and 5.555, the corresponding values in statistics are [-33, 79, 83, 64] and [-113, -62, -79, 64], if I get max, I will get [-33, 79, 83, 64] which is 3.333. That's why i select lower_bounds, upper_bounds and get the max/min by myself.


public static Row[] getStatisticRow(SparkSession spark, Table table) {
Dataset<Row> metadataRows =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.DATA_FILES);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#5872 (comment)
When push down aggregates, we don't need to do a job planning because we only need a LocalScan on Spark driver. I guess maybe we don't need a flag to enable/disable running a parallel operation from within job planning?

* MAX(col2), Types.StringType.get()) Types.NestedField.required(5, MIN(col3),
* Types.StringType.get())
*/
public static Types.NestedField getAggregateNestedField(Expression expr, int index) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Iceberg does not use get in method names. It can either be omitted or replaced with a more descriptive verb, like build or find.

return Types.NestedField.required(index, "COUNT(*)", Types.LongType.get());
} else if (op() == Operation.COUNT) {
return Types.NestedField.required(
index, "COUNT(" + term().ref().name() + ")", Types.LongType.get());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These strings aren't correct because there are two types of terms. A term can be a reference (e.g., a column named id) or a transformed reference (e.g. bucket(16, id)).

The ref() method is convenient for getting the underlying reference, but this can't just get the ref from the term and assume that the term is a ref.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue Thanks for your comment.

Seems to me that we can't aggregate on a transformed reference.
I currently have

public class BoundAggregate<T, C> extends Aggregate<BoundTerm<T>> implements Bound<C>

Should I change to the following?

public class BoundAggregate<T, C> extends Aggregate<BoundReference<T>> implements Bound<C>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not change the definition. Just catch that case and throw an exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to get transformed values working in #6405 by calling term().eval(struct).

return ExpressionVisitors.visit(expr, new AggregateColumnIndexVisitor(table, caseSensitive));
}

private static class AggregateNestedFieldVisitor
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that visitors are what you want for these methods. Visitors are used to transform trees. This appears to be trying to call a method using a visitor to cast the input. Since there are a lot of cases these visitors don't handle, their use is very narrow and probably not something you want to use a visitor for.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the visitors.

I am not sure if I should implement this eval. It seems easier to get the values directly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach I took for this is to use BoundAggregate.eval to produce individual values from a row or data file. Then an Aggregator is used to aggregate those values. It seems to fit fairly cleanly.


public static final String TIMESTAMP_AS_OF = "timestampAsOf";

public static final String AGGREGATE_PUSH_DOWN_ENABLED = "aggregatePushdown";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabled?

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Dec 12, 2022

@huaxingao, as I looked at this, the main thing that I think we should change is moving most of the logic into core or API so that it can be reused across query engines. I tried to do that and opened a PR for it here: #6405

Please take a look!

That creates a public AggregateEvaluator that produces the aggregate row type and a row of aggregated values. If anything is missing, the values are null. I think this could be cleanly wrapped in Spark. What do you think?

Map<String, String> map = currentSnapshot.summary();
// if there are row-level deletes in current snapshot, the statics
// maybe changed, so disable push down aggregate.
if (Integer.parseInt(map.getOrDefault("total-position-deletes", "0")) > 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can be a bit more specific here, we only have to disable this if there are no total position deletes in this particular scan. So for example if there are delete files but they only apply to data files that would not be considered by this scan, we can still push down the aggregates.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer Thank you very much for your review! I will fix this.

List<Types.NestedField> columns = table.schema().columns();
for (int i = 0; i < columns.size(); i++) {
if (aggregate.columnName().equals("*")) {
return -1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the magic value here needs to be a constant somewhere? I see we don't have it defined in the doc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to have quite some changes after @rdblue merges his #6405. I don't need this AggregateUtil any more.

* Returns the column name this aggregate function is on. e.g. SELECT Max(col3) FROM table; This
* method returns col3
*/
public static String getAggregateColumnName(BoundAggregate aggregate) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need all the documentation for these static methods which are just wrappers around object methods. Do we really need all these helpers?

@Override
public C eval(StructLike struct) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement eval");
throw new UnsupportedOperationException(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a formatting change here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is reformatted by spotlessApply

@huaxingao
Copy link
Copy Markdown
Contributor Author

@rdblue I am still waiting for your #6405 to get merged first so I can continue this PR :)

If you are too busy, probably I will take over your PR?

@rdblue
Copy link
Copy Markdown
Contributor

rdblue commented Jan 18, 2023

@huaxingao, I was hoping that you would take over the PR. I'm sorry that I missed your note saying that you were waiting for me to get it in! Please incorporate those changes into yours.

@huaxingao
Copy link
Copy Markdown
Contributor Author

@rdblue

I was hoping that you would take over the PR

Will do. Thank you very much for your reply!

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 10, 2023

@huaxingao I was executing max/count query on iceberg table version 1.3.0 and Spark3.3.1 but unable to see aggregate pushdown i.e. LocalTableScan

Cc: @RussellSpitzer

spark.sql(f""" select max(page_view_dtm) from schema.table1where page_view_dtm between '2020-01-01 00:00:00' and '2021-12-31 23:59:59' """).explain()

and explain plan generated is

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(page_view_dtm#139)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=62]
      +- HashAggregate(keys=[], functions=[partial_max(page_view_dtm#139)])
         +- Filter ((page_view_dtm#139 >= 2020-01-01 00:00:00) AND (page_view_dtm#139 <= 2021-12-31 23:59:59))
            +- BatchScan[page_view_dtm#139] spark_catalog.schema.table1(branch=null) [filters=page_view_dtm IS NOT NULL, page_view_dtm >= 1577854800000000, page_view_dtm <= 1641013199000000, groupedBy=] RuntimeFilters: []

@huaxingao
Copy link
Copy Markdown
Contributor Author

huaxingao commented Oct 11, 2023 via email

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 11, 2023

@huaxingao Thanks for your response. Even in the case of max on non filter column, aggregate pushdown is not working.

In the below explain plan partition is defined on initial_page_view_dtm and I am filtering on the same. So in the case of this table which is relatively small aggregate pushdown works without filter but as soon I add filter it does not work. While in case of large table aggregate pushdown is not working at all and it gives message

SparkScanBuilder: Skipping aggregate pushdown: detected row level deletes

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(pageviewdate#465)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=86]
      +- HashAggregate(keys=[], functions=[partial_max(pageviewdate#465)])
         +- Project [pageviewdate#465]
            +- Filter (initial_page_view_dtm#468 >= 2023-01-01 00:00:00)
               +- BatchScan[pageviewdate#465, initial_page_view_dtm#468] spark_catalog.schema.table1 (branch=null) [filters=initial_page_view_dtm IS NOT NULL, initial_page_view_dtm >= 1672549200000000, groupedBy=] RuntimeFilters: []

@RussellSpitzer
Copy link
Copy Markdown
Member

@atifiu Pushdown cannot happen if there are row level deletes as indicated in that log line. Row level deletes mean the file statistics are not accurate so they cannot be used for finding aggs.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 11, 2023

@RussellSpitzer What can be done to resolve it ? Is rewriting the data file will resolve it ?

@RussellSpitzer
Copy link
Copy Markdown
Member

You would need to remove all delete files from the snapshot. I think this currently requires a rewrite data files + rewrite delete files

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 11, 2023

I am pretty sure that I don't have any delete files because I have defined copy on write for update, merge, delete.

@amogh-jahagirdar
Copy link
Copy Markdown
Contributor

Huh that is strange if you are hitting this log line and have CoW defined, I don't see how that can be possible. @atifiu do you mind creating a new issue and include these details? As a sanity check could you check your format-version table property? If it really is CoW this should be 1.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 12, 2023

@amogh-jahagirdar format version defined is 2 and I have explicitly defined copy on write for delete, update and merge. I have deleted some partitions and have noticed that in the snapshot metadata table it shows 159 delete files which is quite a surprise for me. I am trying to rewrite delete files but it's throwing error for me.

@amogh-jahagirdar
Copy link
Copy Markdown
Contributor

amogh-jahagirdar commented Oct 12, 2023

My mistake, yes you can have format version 2 and have copy on write mode. The remaining issue is why you are even seeing delete files if CoW is set. That seems to be the fundamental issue here. I'll try and repro that.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 13, 2023

@huaxingao What can be the possible reasons for aggregate pushdown to not work when using filters, if you can give me some idea/hint I will try to look into it further.

@huaxingao huaxingao deleted the agg_push_down branch October 13, 2023 14:53
@huaxingao
Copy link
Copy Markdown
Contributor Author

@atifiu File statistics are not accurate and can't be used any more if you use filters.

For example, you have table (col int), the max of col is 100, and the min is 0, so the statistics file is

max       min
100        1

If you have SELECT MAX(col) FROM table, we can check the statistics file and simple return 100, but if you have SELECT MAX(col) FROM table WHERE col < 70, we can't use the statistics file any more. We only know that the MAX(col) is smaller than 70, but we have no idea what value it is, so have to compute.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 13, 2023

@huaxingao so you meant to say that with filters whether on partitioned or non partitioned column(s), aggregate pushdown will not work ?

@huaxingao
Copy link
Copy Markdown
Contributor Author

If filters are on partitioned columns, aggregate pushdown should work.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 13, 2023

It's not working. Either with between, > or <.

@huaxingao
Copy link
Copy Markdown
Contributor Author

@atifiu I suspect somehow your partition filter isn't completely pushed down. In this PR, we will discard filters that can be completely evaluated using partition metadata. Could you check the EXPLAIN and see if there is still filter? If there is filter, then somehow the partition filter is not completely pushed down and Spark has to filter again, and statistics is not accurate in this case, and we can't push down aggregates.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 14, 2023

@huaxingao So, unless we get this message "Evaluating completely on Iceberg" which means full filter pushdown is down, otherwise filter pushdown is partial or or not at all. In my case I am doing filtering on partitioned column with day transformation on timestamp data type. But i am querying data based on local timezone but partitioning is defined on UTC timezone which means partial pushdown will take place. This might be the reason aggregate pushdown is not working in case of filters. I still have to validate this in the logs but is my understanding correct ?

@huaxingao
Copy link
Copy Markdown
Contributor Author

@atifiu That's right. If the filters are not completely evaluated on iceberg, then Spark has to evaluate the filters, the Min/Max/Count values might change after the filtering. Therefore, we can't rely on the statistical information for Min/Max/Count.
I also suspect that the timezone could be the reason that the filter isn't fully pushed down, but I can't be certain without examining the details.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 16, 2023

@huaxingao Based on your suggestion, I have narrowed the filter criteria so that even considering the timezone problem, we dont filter on more than two partitions so that filter can be pushed down completely or filter complete partition by adjusting the time timestamp according to UTC time but in either case I see post-scan filters and no aggregate pushdown. Although I do see in the log. Please let me know what I am missing here.

"Evaluating completely on Iceberg side: IsNotNull(initial_page_view_dtm)"

23/10/16 06:36:08 INFO SparkScanBuilder: Evaluating completely on Iceberg side: IsNotNull(initial_page_view_dtm)
23/10/16 06:36:08 INFO V2ScanRelationPushDown:
Pushing operators to spark_catalog.schema.table1
Pushed Filters: IsNotNull(initial_page_view_dtm), GreaterThanOrEqual(initial_page_view_dtm,2023-06-02 06:00:00.0), LessThanOrEqual(initial_page_view_dtm,2023-06-02 08:59:59.0)
Post-Scan Filters: (initial_page_view_dtm#3 >= 2023-06-02 06:00:00),(initial_page_view_dtm#3 <= 2023-06-02 08:59:59)

23/10/16 06:36:08 INFO V2ScanRelationPushDown:
Output: pageviewdate#0, initial_page_view_dtm#3

23/10/16 06:36:09 INFO SnapshotScan: Scanning table spark_catalog.schema.table1 snapshot 3251312493606204579 created at 2023-10-05T08:25:16.490+00:00 with filter ((initial_page_view_dtm IS NOT NULL AND initial_page_view_dtm >= (16-digit-int)) AND initial_page_view_dtm <= (16-digit-int))
23/10/16 06:36:09 INFO LoggingMetricsReporter: Received metrics report: ScanReport{tableName=spark_catalog.schema.table1, snapshotId=3251312493606204579, filter=((not_null(ref(name="initial_page_view_dtm")) and ref(name="initial_page_view_dtm") >= "(16-digit-int)") and ref(name="initial_page_view_dtm") <= "(16-digit-int)"), schemaId=0, projectedFieldIds=[1, 4], projectedFieldNames=[pageviewdate, initial_page_view_dtm], scanMetrics=ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.383991592S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=1}, resultDeleteFiles=CounterResult{unit=COUNT, value=0}, totalDataManifests=CounterResult{unit=COUNT, value=68}, totalDeleteManifests=CounterResult{unit=COUNT, value=0}, scannedDataManifests=CounterResult{unit=COUNT, value=1}, skippedDataManifests=CounterResult{unit=COUNT, value=67}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=340185692}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=0}, skippedDataFiles=CounterResult{unit=COUNT, value=30}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=0}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=0}, equalityDeleteFiles=CounterResult{unit=COUNT, value=0}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}}, metadata={engine-version=3.3.1, iceberg-version=Apache Iceberg 1.3.0 (commit 7dbdfd33a667a721fbb21c7c7d06fec9daa30b88), app-id=application_1689900894764_104752, engine-name=spark}}

@huaxingao
Copy link
Copy Markdown
Contributor Author

@atifiu Based on the log, only IsNotNull(initial_page_view_dtm) is completely evaluated on iceberg side. Both (initial_page_view_dtm#3 >= 2023-06-02 06:00:00) and initial_page_view_dtm#3 <= 2023-06-02 08:59:59 are still evaluated on Spark side, that's why aggregates can't be pushed down.

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 16, 2023

@huaxingao So finally it is working but without between and <= operators. Yes, I have to tweak my query to adjust the timezone so that entire partition is picked by query.

23/10/16 13:03:17 INFO SparkScanBuilder: Evaluating completely on Iceberg side: IsNotNull(page_view_dtm)
23/10/16 13:03:17 INFO SparkScanBuilder: Evaluating completely on Iceberg side: GreaterThanOrEqual(page_view_dtm,2022-12-31 19:00:00.0)
23/10/16 13:03:17 INFO SparkScanBuilder: Evaluating completely on Iceberg side: LessThan(page_view_dtm,2023-01-01 19:00:00.0)
23/10/16 13:03:17 INFO V2ScanRelationPushDown:
Pushing operators to spark_catalog.schema.table1
Pushed Filters: IsNotNull(page_view_dtm), GreaterThanOrEqual(page_view_dtm,2022-12-31 19:00:00.0), LessThan(page_view_dtm,2023-01-01 19:00:00.0)
Post-Scan Filters:

@atifiu
Copy link
Copy Markdown

atifiu commented Oct 17, 2023

@amogh-jahagirdar I think I know how these delete files are generated even though copy on write is defined at table level. I have executed the delete from Trino and since it only supports merge on read these delete files are generated. Now I am finding it really difficult to remove these delete files. Will try removing partition to see if they will be removed or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants